-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Context summarization feature implementation #3621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really great! Very clean and easy to understand. Nice work 👏
I'm going to do some testing and will let you know if I find anything.
| @@ -0,0 +1,307 @@ | |||
| # Code Cleanup Skill | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine to start. We will probably want to iterate until we land on something optimized.
markbackman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the clean up.
From the first review, the only item I still see open is this one (regarding the base summary prompt):
https://github.com/pipecat-ai/pipecat/pull/3621/changes#r2775817727
Aside from that, this looks good to go. It probably makes sense to get input from someone else too since this is such a key feature and will get a ton of use.
|
I have missed that one. Fixed. Thank you for the review @markbackman. 🙌 |
69b4a10 to
161ede2
Compare
| class LLMContextSummaryRequestFrame(ControlFrame): | ||
| """Frame requesting context summarization from an LLM service. | ||
|
|
||
| Sent by aggregators to LLM services when conversation context needs to be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might also be worth describing what the LLMs then do with that summary (i.e. push a LLMContextSummaryResultFrame, right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was describing what they do inside LLMContextSummaryResultFrame.
src/pipecat/frames/frames.py
Outdated
| context: The full LLM context containing all messages to analyze and summarize. | ||
| min_messages_to_keep: Number of recent messages to preserve uncompressed. | ||
| These messages will not be included in the summary. | ||
| max_context_tokens: Maximum allowed context size in tokens. The LLM should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feedback applies throughout: should we transparent about the fact that it's approximate tokens, by maybe calling it something like max_approx_context_tokens (and updating docstring comments accordingly to let developers know what they're dealing with)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name max_context_tokens is correct here, because this is what we pass to the LLM when running inference to enforce the maximum number of tokens.
The one inside LLMContextSummarizationConfig, in that case, what the user specifies is only an approximation, since the way we calculate the tokens is approximate. Even so, in my opinion we should keep the same name there, but make it clear in the docstring that the token calculation is an approximation.
What do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, for now I have improved the description of max_context_tokens, inside LLMContextSummarizationConfig, explaining how the tokens are calculated.
|
|
||
|
|
||
| @dataclass | ||
| class LLMContextSummaryRequestFrame(ControlFrame): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: do we want to consume these frames in the LLMs, or let them continue down the pipeline, just in case anyone wants to handle them in custom processors?
My inclination is that these seem fine to consume in the LLMs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed in yesterday’s meeting, in this case, since we’re actually handling the frame and doing something with it, creating the summary, it feels more natural to consume the frame here rather than let it continue.
| self._params.context_summarization_config or LLMContextSummarizationConfig() | ||
| ) | ||
| self._summarization_in_progress = False | ||
| self._pending_summary_request_id: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need both of the above state variables? could the presence of _pending_summary_request_id indicate that a summarization is in progress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that could work. 👍
| for s in self._params.user_mute_strategies: | ||
| await s.cleanup() | ||
|
|
||
| async def _clear_summarization(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: _clear_summarization_state might be clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| # Apply summary | ||
| await self._apply_summary(frame.summary, frame.last_summarized_index) | ||
|
|
||
| def _validate_summary_context(self, last_summarized_index: int) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function safeguarding against the possibility of programmatic edits to the context, like from LLMMessageUpdateFrames and the like? If so, then in a PR I worked on recently (which maybe you've looked through already) I added some mechanisms for tracking with more certainty whether a context has been edited...wonder if we could join forces and use those here to determine with more certainty whether a summary still applies.
| """ | ||
| messages = self._context.messages | ||
|
|
||
| # Find first system message (if any) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a very fringe case, but should we handle the possibility of the first system message appearing later than the last summarized index? There's technically no hard requirement that a "system"-role message has to appear at or near the beginning of the conversation, esp. with providers like OpenAI.
Or...does the summarization process already exclude the first system message? (I should probably just read on to find out, but wanted to jot this note down here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using OpenAI (where "system"-role messages can appear anywhere), forcing the system message to appear at the beginning on summarization might mess with conversation flow. But on the other hand, summarization does needs to work universally, and other providers don't handle "system"-role messages anywhere...
With Gemini, we "pull" the first system instruction out of the messages and use it as the overall system instruction (which it seems like the logic here is modeled after). But with AWS Bedrock, we only pull a "system"-role message our of messages and use it as system instruction if it's the first message. We're inconsistent, which isn't ideal...
As I think about it, two approaches come to mind:
- What you have here
- Only checking the very first message in
messagesfor a system message
Almost always, those two are the same. So in practice I don't know if this makes much of a difference.
But it's a good reminder that we should probably do a consistency pass on how we translate "system"-role messages for different providers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I am doing in this PR is finding the index of the first system message and defining summary_start = first_system_index + 1.
I then summarize only the messages that come after the first system message, or everything if there is no system message.
src/pipecat/services/llm_service.py
Outdated
| ) | ||
|
|
||
| # Calculate max_tokens for the summary using utility method | ||
| max_summary_tokens = LLMContextSummarizationUtil.calculate_max_summary_tokens( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding: the max_context_tokens that the developer specifies as the point where a summary should be triggered is the same number used to compute how big the summary should be?
It seems like how big you want the summary to be should be (at least somewhat) independent—you might want to ask the summary to be relatively compact so you don't have to do it as often, rather than letting it take up all the remaining space, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It basically is, but when calculating the available space to define the max_tokens that I pass to the LLM, I always apply a 0.8 buffer to keep the summary at a maximum of 80% of the available space.
But I think you’re right, we should probably create a:
target_context_tokens: the target maximum context size in tokens after summarization.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I have created target_context_tokens
| token_limit_exceeded = total_tokens >= token_limit | ||
|
|
||
| # Check if we've exceeded max unsummarized messages | ||
| messages_since_summary = len(self._context.messages) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do you want to count the possible initial system message towards the limit? if not, you might have to subtract 2, no?
| filter_incomplete_user_turns: bool = False | ||
| user_turn_completion_config: Optional[UserTurnCompletionConfig] = None | ||
| enable_context_summarization: bool = False | ||
| context_summarization_config: Optional[LLMContextSummarizationConfig] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for adding this into the user aggregator instead of the assistant one? Just curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually, seems to be this should be done in the assistant, feels more natural, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to include it there because each time the user started a new turn, and we pushed a new context frame, I thought it was a nice moment to also push, as a follow up, a frame requesting summarization if needed.
This way, the LLM would have time to process it while the TTS was generating the previous answer.
So it felt like a good spot to add this logic without impacting performance.
But like we discussed on slack, we can achieve something similar using the assistant aggregator, if we use the LLMFullResponseStartFrame to trigger if we should or not request the context summarization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/pipecat/services/llm_service.py
Outdated
| logger.debug(f"{self}: Processing summarization request {frame.request_id}") | ||
|
|
||
| # Create a background task to generate the summary without blocking | ||
| self.create_task(self._generate_summary_task(frame)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should save a reference to the task and cancel it on cleanup if necessary. we should also call
await asyncio.sleep(0)
to schedule the task in the event loop.
src/pipecat/services/llm_service.py
Outdated
| logger.debug(f"{self}: Processing summarization request {frame.request_id}") | ||
|
|
||
| # Create a background task to generate the summary without blocking | ||
| self.create_task(self._generate_summary_task(frame)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need to keep track of this task. and, since we don't await it, we need to call async asyncio.sleep(0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I have just pushed the fix to keep track of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I am not sure why we need this ?
asyncio.sleep(0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I havent added this one yet. Is it really needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't cancel the task during interruption, then no, not needed. The reason is that if you cancel a task before the task is started by the event loop (different than created) you will get RuntimeWarnings saying that the task was never awaited. Since we don't cancel the task during interruptions, I think we should be ok.
|
LGTM! 👏 |
5a3edb6 to
2475697
Compare
Summary
enable_context_summarization=TrueinLLMUserAggregatorParamswith customizable thresholds and behaviorKey Features
Configuration
Testing
Run the new test suite:
Try the examples:
Implementation Details
New Components:
src/pipecat/utils/context/llm_context_summarization.py: Core utility with token estimation, message selection, and formattingLLMContextSummaryRequestFrameandLLMContextSummaryResultFrame: New control frames for async summarization flowLLMContextSummarizationConfig: Configuration dataclass with validationModified Components:
LLMUserAggregator: Added summarization trigger logic, state tracking, and result handlingLLMService: Added async summary generation usingrun_inference()with max_tokens override